Fanount 基础  (没有路由Key))


SpringBoot+RabbitMQ的简单实现之Fanout模式
1.在pom中添加springboot对amqp的支持
1 2 3 4
   | <dependency> 	<groupId>org.springframework.boot</groupId> 	<artifactId>spring-boot-starter-amqp</artifactId> </dependency>
   | 
 
2.在application.properties中添加RabbitMQ的简单配置信息
1 2 3 4 5
   | spring.rabbitmq.host=127.0.0.1 #5672是发送消息端口,15672是管理端的端口 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
   | 
 
3.配置Queue(消息队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
   | @Configuration public class QueueConfig {   	@Bean(name = "queue-fanoutA") 	public Queue queue_fanoutA() { 		return new Queue("queue-fanoutA"); 	}   	@Bean(name = "queue-fanoutB") 	public Queue queue-fanoutB() { 		return new Queue("queue-fanoutB"); 	}   	@Bean(name = "queue-fanoutC") 	public Queue queue-fanoutC() { 		return new Queue("queue-fanoutC"); 	}
     	@Bean 	public FanoutExchange fanoutExchange() { 		return new FanoutExchange("fanout_exchange"); 	}
     	@Bean 	Binding bindingExchangeFanoutA(@Qualifier("queue-fanoutA") Queue queue-fanoutA, FanoutExchange fanoutExchange) { 		return BindingBuilder.bind(queue-fanoutA).to(fanoutExchange); 	}
       	@Bean 	Binding bindingExchangeFanoutB(@Qualifier("queue-fanoutB") Queue queue-fanoutB, FanoutExchange fanoutExchange) { 		return BindingBuilder.bind(queue-fanoutB).to(fanoutExchange); 	}    	@Bean 	Binding bindingExchangeFanoutC(@Qualifier("queue-fanoutC") Queue queue-fanoutC, FanoutExchange fanoutExchange) { 		return BindingBuilder.bind(queue-fanoutC).to(fanoutExchange); 	}
  }
   | 
 
4.编写消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | @Component public class Sender_Fanout {
  	@Autowired 	private RabbitTemplate rabbitTemplate;          
 
 
 
 
 
      public void send(String exchangeName,String routingKey,Message message) {            	rabbitTemplate.convertAndSend(exchangeName,null,message);     } }
   | 
 
5.编写消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
   | @Component public class Receive_Fanout {
             @RabbitListener(queues="queue-fanoutA")     public void processA(Message message) throws UnsupportedEncodingException {     	MessageProperties messageProperties = message.getMessageProperties();     	     	String contentType = messageProperties.getContentType();     	         System.out.println("Receive-FanoutA:"+new String(message.getBody(), contentType));     }                           @RabbitListener(queues="queue-fanoutB")     public void processB(Message message) throws UnsupportedEncodingException {     	MessageProperties messageProperties = message.getMessageProperties();     	     	String contentType = messageProperties.getContentType();     	         System.out.println("Receive-FanoutB:"+new String(message.getBody(), contentType));     }                         @RabbitListener(queues="queue-fanoutC")     public void processC(Message message) throws UnsupportedEncodingException {     	MessageProperties messageProperties = message.getMessageProperties();     	     	String contentType = messageProperties.getContentType();     	         System.out.println("Receive-FanoutC:"+new String(message.getBody(), contentType));     } }
   | 
 
Tsst
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | @RunWith(SpringRunner.class) @SpringBootTest public class TestRabbitMQ_Fanout {          @Autowired     private Sender_Fanout sender_Fanout;           @Test     public void testRabbit_Fanout() {     
 
          MessageProperties messageProperties = new MessageProperties(); 	         	        messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); 	        messageProperties.setContentType("UTF-8");                  Message message = new Message("hello,rabbit_topic!".getBytes(), messageProperties);              	sender_Fanout.send("fanout_exchange","",message);     } }
   | 
 
1 2 3 4 5 6
   |   @RabbitListener(bindings = @QueueBinding(value = @Queue("myQueuebingExchange"), exchange = @Exchange("myExchange")  ))  public void process(String message) {      log.info("message={}", message);  }
 
  | 
 


         
        
            
                
                    
                        Author:
                        John Doe
                    
                
                
                    
                        Permalink:
                        http://yoursite.com/2019/08/10/消息队列/RabbitMQ/RabbitMQ整合/Fanount 发布订阅模式/
                    
                
                
                    
                        License:
                        Copyright (c) 2019 CC-BY-NC-4.0 LICENSE
                    
                
                
                     
                         Slogan:
                         Do you believe in DESTINY?